Conversation
Greptile SummaryThis PR introduces a new
Confidence Score: 3/5Not safe to merge as-is; two P1 bugs in the new DimSimConnection affect error diagnostics and memory/cache correctness. The two P1 findings in dimsim_connection.py — the stderr/log-reader race that silently drops error output on startup failure, and the class-level functools.cache that leaks memory and incorrectly clears other instances’ caches — both represent present defects in the changed code that should be fixed before merging. dimos/robot/unitree/dimsim_connection.py requires attention for the stderr drain race and the @functools.cache instance-method misuse. Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant DimSimConnection
participant subprocess as DimSim Process
participant LCMThread as LCM Listener Thread
participant LogReader as Log Reader Thread
participant StreamThread as Stream Polling Thread
Caller->>DimSimConnection: start()
DimSimConnection->>subprocess: Popen(dimsim dev ...)
DimSimConnection->>LogReader: _start_log_reader() [reads stdout/stderr]
DimSimConnection->>LCMThread: _start_lcm_listener()
loop Wait for odom (60s timeout)
DimSimConnection->>subprocess: process.poll()
subprocess-->>DimSimConnection: None (still running)
LCMThread-->>DimSimConnection: _on_lcm_message("/odom") _odom_seq++
end
DimSimConnection->>Caller: ready
Caller->>DimSimConnection: video_stream()
DimSimConnection->>StreamThread: _create_stream(getter, 20fps)
loop 20 FPS
StreamThread-->>Caller: observer.on_next(Image)
end
Caller->>DimSimConnection: stop()
DimSimConnection->>StreamThread: stop_event.set()
DimSimConnection->>LCMThread: _stop_event.set()
DimSimConnection->>subprocess: terminate() / kill()
Reviews (1): Last reviewed commit: "002" | Re-trigger Greptile |
| self._start_log_reader() | ||
| self._start_lcm_listener() | ||
|
|
||
| # Wait for first odom message as readiness signal. | ||
| timeout = 60.0 | ||
| start_time = time.time() | ||
| while time.time() - start_time < timeout: | ||
| if self.process.poll() is not None: | ||
| exit_code = self.process.returncode | ||
| stderr = "" | ||
| if self.process.stderr: | ||
| stderr = self.process.stderr.read().decode(errors="replace") | ||
| self.stop() | ||
| raise RuntimeError(f"DimSim process exited early (code {exit_code})\n{stderr}") |
There was a problem hiding this comment.
stderr already consumed by log reader thread
_start_log_reader() launches a daemon thread at line 204 that iterates over self.process.stderr. Then, when the early-exit branch is reached (line 211), self.process.stderr.read() at line 215 races with that thread — the log reader has likely already drained the pipe, so stderr will almost always be empty, making the error message useless for diagnosing why DimSim failed to start.
Either start the log reader after the readiness-wait loop, or buffer the subprocess output with a deque in the log reader thread so the error handler can read from that buffer instead of the pipe directly.
| @functools.cache | ||
| def lidar_stream(self) -> Observable[PointCloud2]: | ||
| def getter() -> PointCloud2 | None: | ||
| if self._lidar_seq > self._last_lidar_seq: | ||
| self._last_lidar_seq = self._lidar_seq | ||
| return self._latest_lidar | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _LIDAR_FPS, "Lidar") | ||
|
|
||
| @functools.cache | ||
| def odom_stream(self) -> Observable[PoseStamped]: | ||
| def getter() -> PoseStamped | None: | ||
| if self._odom_seq > self._last_odom_seq: | ||
| self._last_odom_seq = self._odom_seq | ||
| return self._latest_odom | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _ODOM_FPS, "Odom") | ||
|
|
||
| @functools.cache | ||
| def video_stream(self) -> Observable[Image]: | ||
| def getter() -> Image | None: | ||
| if self._image_seq > self._last_image_seq: | ||
| self._last_image_seq = self._image_seq | ||
| return self._latest_image | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _VIDEO_FPS, "Video") |
There was a problem hiding this comment.
@functools.cache on instance methods — class-level cache causes leaks and cross-instance invalidation
functools.cache (an alias for lru_cache(maxsize=None)) stores its cache on the function object, not on self. The result is:
- Memory leak — the cache keeps a strong reference to every
selfthat was ever passed as a key, preventing garbage collection. cache_clear()instop()is global — callingself.lidar_stream.cache_clear()clears the entries for allDimSimConnectioninstances, not just the one being stopped.
The standard fix is to store the Observable in an instance dict on first call and reset it in stop() instead of relying on cache_clear().
| if self.process: | ||
| if self.process.stderr: | ||
| self.process.stderr.close() |
There was a problem hiding this comment.
stdout pipe is never closed in stop()
stop() closes self.process.stderr but leaves self.process.stdout open, leaking the file descriptor. Consider closing both:
| if self.process: | |
| if self.process.stderr: | |
| self.process.stderr.close() | |
| if self.process.stderr: | |
| self.process.stderr.close() | |
| if self.process.stdout: | |
| self.process.stdout.close() |
| def getter() -> PointCloud2 | None: | ||
| if self._lidar_seq > self._last_lidar_seq: | ||
| self._last_lidar_seq = self._lidar_seq | ||
| return self._latest_lidar | ||
| return None | ||
|
|
||
| return self._create_stream(getter, _LIDAR_FPS, "Lidar") |
There was a problem hiding this comment.
Unsynchronised access to shared state in stream getters
The getter closures read and write self._lidar_seq, self._last_lidar_seq, and self._latest_lidar from polling threads while the LCM callback thread writes them concurrently. There is a TOCTOU window where _latest_lidar can be None even after the sequence check passes. A threading.Lock protecting reads and writes to these pairs would close the window.
No description provided.